package org.go.scheduler.database.delegate;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.NotSerializableException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.math.BigDecimal;
import java.sql.Blob;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import org.go.domain.GoDomain;
import org.go.expcetion.JobPersistenceException;
import org.go.expcetion.NoSuchDelegateException;
import org.go.matchers.GroupMatcher;
import org.go.scheduler.database.CalendarIntervalTriggerPersistenceDelegate;
import org.go.scheduler.database.CronTriggerPersistenceDelegate;
import org.go.scheduler.database.FiredTriggerRecord;
import org.go.scheduler.database.SchedulerStateRecord;
import org.go.scheduler.database.SimpleTriggerPersistenceDelegate;
import org.go.scheduler.database.TriggerPersistenceDelegate;
import org.go.scheduler.database.TriggerPersistenceDelegate.TriggerPropertyBundle;
import org.go.scheduler.database.TriggerStatus;
import org.go.scheduler.database.util.StdJDBCConstants;
import org.go.spi.ClassLoadHelper;
import org.go.trigger.Calendar;
import org.go.trigger.Trigger;
import org.go.trigger.TriggerBuilder;
import org.go.trigger.TriggerKey;
import org.go.util.Key;
import org.go.util.Util;
import org.go.work.Work;
import org.go.work.WorkDataMap;
import org.go.work.WorkKey;
import org.slf4j.Logger;

/**
 * <p>
 * This is meant to be an abstract base class for most, if not all, <code>{@link org.quartz.impl.jdbcjobstore.DriverDelegate}</code>
 * implementations. Subclasses should override only those methods that need
 * special handling for the DBMS driver in question.
 * </p>
 * 
 * @author <a href="mailto:jeff@binaryfeed.org">Jeffrey Wescott</a>
 * @author James House
 * @author Eric Mueller
 */
public class StdJDBCDelegate implements DriverDelegate, StdJDBCConstants {
	/**
	 * Cleanup helper method that closes the given <code>ResultSet</code>
	 * while ignoring any errors.
	 */
	protected static void closeResultSet(ResultSet rs) {
		if (null != rs) {
			try {
				rs.close();
			} catch (SQLException ignore) {
			}
		}
	}

	/**
	 * Cleanup helper method that closes the given <code>Statement</code>
	 * while ignoring any errors.
	 */
	protected static void closeStatement(Statement statement) {
		if (null != statement) {
			try {
				statement.close();
			} catch (SQLException ignore) {
			}
		}
	}

	private String schedNameLiteral = null;

	protected ClassLoadHelper classLoadHelper;

	protected String instanceId;

	protected Logger logger = null;

	protected String schedName;

	protected String tablePrefix = DEFAULT_TABLE_PREFIX;

	protected List<TriggerPersistenceDelegate> triggerPersistenceDelegates = new LinkedList<TriggerPersistenceDelegate>();

	protected boolean useProperties;

	/**
	 * <p>
	 * Create new StdJDBCDelegate instance.
	 * </p>
	 * 
	 * @param logger
	 *          the logger to use during execution
	 * @param tablePrefix
	 *          the prefix of all table names
	 */
	public StdJDBCDelegate(Logger logger, String tablePrefix, String schedName, String instanceId, ClassLoadHelper classLoadHelper) {
		this.logger = logger;
		this.tablePrefix = tablePrefix;
		this.schedName = schedName;
		this.instanceId = instanceId;
		this.classLoadHelper = classLoadHelper;
		addDefaultTriggerPersistenceDelegates();
	}

	/**
	 * <p>
	 * Create new StdJDBCDelegate instance.
	 * </p>
	 * 
	 * @param logger
	 *          the logger to use during execution
	 * @param tablePrefix
	 *          the prefix of all table names
	 */
	public StdJDBCDelegate(Logger logger, String tablePrefix, String schedName, String instanceId, ClassLoadHelper classLoadHelper, Boolean useProperties) {
		this.logger = logger;
		this.tablePrefix = tablePrefix;
		this.schedName = schedName;
		this.instanceId = instanceId;
		this.useProperties = useProperties.booleanValue();
		this.classLoadHelper = classLoadHelper;
		addDefaultTriggerPersistenceDelegates();
	}

	private String getKeyOfNonSerializableValue(WorkDataMap data) {
		// TODO Auto-generated method stub
		return null;
	}

	/**
	 * build Map from java.util.Properties encoding.
	 */
	private Map<?, ?> getMapFromProperties(ResultSet rs) throws ClassNotFoundException, IOException, SQLException {
		Map<?, ?> map;
		InputStream is = (InputStream) getJobDataFromBlob(rs, COL_JOB_DATAMAP);
		if (is == null) {
			return null;
		}
		Properties properties = new Properties();
		if (is != null) {
			try {
				properties.load(is);
			} finally {
				is.close();
			}
		}
		map = convertFromProperty(properties);
		return map;
	}

	private WorkKey jobKey(String string, String string2) {
		// TODO Auto-generated method stub
		return null;
	}

	private TriggerBuilder newTrigger() {
		// TODO Auto-generated method stub
		return null;
	}

	/**
	 * serialize the java.util.Properties
	 */
	private ByteArrayOutputStream serializeProperties(WorkDataMap data) throws IOException {
		ByteArrayOutputStream ba = new ByteArrayOutputStream();
		if (null != data) {
			Properties properties = convertToProperty(data.getWrappedMap());
			properties.store(ba, "");
		}

		return ba;
	}

	private void setTriggerStateProperties(Trigger trigger, TriggerPersistenceDelegate.TriggerPropertyBundle props) throws JobPersistenceException {

		if (props.getStatePropertyNames() == null)
			return;

		Util.setBeanProps(trigger, props.getStatePropertyNames(), props.getStatePropertyValues());
	}

	private TriggerKey triggerKey(String triggerName, String groupName) {
		// TODO Auto-generated method stub
		return null;
	}

	protected void addDefaultTriggerPersistenceDelegates() {
		addTriggerPersistenceDelegate(new SimpleTriggerPersistenceDelegate());
		addTriggerPersistenceDelegate(new CronTriggerPersistenceDelegate());
		addTriggerPersistenceDelegate(new CalendarIntervalTriggerPersistenceDelegate());
	}

	protected boolean canUseProperties() {
		return useProperties;
	}

	/**
	 * convert the JobDataMap into a list of properties
	 */
	protected Map<?, ?> convertFromProperty(Properties properties) throws IOException {
		return new HashMap<Object, Object>(properties);
	}

	/**
	 * convert the JobDataMap into a list of properties
	 */
	protected Properties convertToProperty(Map<?, ?> data) throws IOException {
		Properties properties = new Properties();

		for (Iterator<?> entryIter = data.entrySet().iterator(); entryIter.hasNext();) {
			Map.Entry<?, ?> entry = (Map.Entry<?, ?>) entryIter.next();

			Object key = entry.getKey();
			Object val = (entry.getValue() == null) ? "" : entry.getValue();

			if (!(key instanceof String)) {
				throw new IOException("JobDataMap keys/values must be Strings " + "when the 'useProperties' property is set. " + " offending Key: " + key);
			}

			if (!(val instanceof String)) {
				throw new IOException("JobDataMap values must be Strings " + "when the 'useProperties' property is set. " + " Key of offending value: " + key);
			}

			properties.put(key, val);
		}

		return properties;
	}

	protected void deleteTriggerExtension(Connection conn, TriggerKey triggerKey) throws SQLException {
		for (TriggerPersistenceDelegate tDel : triggerPersistenceDelegates) {
			if (tDel.deleteExtendedTriggerProperties(conn, triggerKey) > 0)
				return; // as soon as one affects a row, we're done.
		}
	}

	/**
	 * Retrieves the value of the designated column index in the current row as
	 * a <code>boolean</code>.
	 * This just wraps <code>{@link ResultSet#getBoolean(java.lang.String)}</code>
	 * by default, but it can be overloaded by subclass delegates for databases that
	 * don't explicitly support the boolean type.
	 */
	protected boolean getBoolean(ResultSet rs, int columnIndex) throws SQLException {
		return rs.getBoolean(columnIndex);
	}

	/**
	 * Retrieves the value of the designated column in the current row as
	 * a <code>boolean</code>.
	 * This just wraps <code>{@link ResultSet#getBoolean(java.lang.String)}</code>
	 * by default, but it can be overloaded by subclass delegates for databases that
	 * don't explicitly support the boolean type.
	 */
	protected boolean getBoolean(ResultSet rs, String columnName) throws SQLException {
		return rs.getBoolean(columnName);
	}

	/**
	 * <p>
	 * This method should be overridden by any delegate subclasses that need
	 * special handling for BLOBs for job details. The default implementation
	 * uses standard JDBC <code>java.sql.Blob</code> operations.
	 * </p>
	 * 
	 * @param rs
	 *          the result set, already queued to the correct row
	 * @param colName
	 *          the column name for the BLOB
	 * @return the deserialized Object from the ResultSet BLOB
	 * @throws ClassNotFoundException
	 *           if a class found during deserialization cannot be found
	 * @throws IOException
	 *           if deserialization causes an error
	 */
	protected Object getJobDataFromBlob(ResultSet rs, String colName) throws ClassNotFoundException, IOException, SQLException {
		if (canUseProperties()) {
			Blob blobLocator = rs.getBlob(colName);
			if (blobLocator != null) {
				InputStream binaryInput = blobLocator.getBinaryStream();
				return binaryInput;
			} else {
				return null;
			}
		}

		return getObjectFromBlob(rs, colName);
	}

	/**
	 * Find the key of the first non-serializable value in the given Map.
	 * 
	 * @return The key of the first non-serializable value in the given Map or 
	 * null if all values are serializable.
	 */
	protected Object getKeyOfNonSerializableValue(Map<?, ?> data) {
		for (Iterator<?> entryIter = data.entrySet().iterator(); entryIter.hasNext();) {
			Map.Entry<?, ?> entry = (Map.Entry<?, ?>) entryIter.next();

			ByteArrayOutputStream baos = null;
			try {
				baos = serializeObject(entry.getValue());
			} catch (IOException e) {
				return entry.getKey();
			} finally {
				if (baos != null) {
					try {
						baos.close();
					} catch (IOException ignore) {
					}
				}
			}
		}

		// As long as it is true that the Map was not serializable, we should
		// not hit this case.
		return null;
	}

	/**
	 * <p>
	 * This method should be overridden by any delegate subclasses that need
	 * special handling for BLOBs. The default implementation uses standard
	 * JDBC <code>java.sql.Blob</code> operations.
	 * </p>
	 * 
	 * @param rs
	 *          the result set, already queued to the correct row
	 * @param colName
	 *          the column name for the BLOB
	 * @return the deserialized Object from the ResultSet BLOB
	 * @throws ClassNotFoundException
	 *           if a class found during deserialization cannot be found
	 * @throws IOException
	 *           if deserialization causes an error
	 */
	protected Object getObjectFromBlob(ResultSet rs, String colName) throws ClassNotFoundException, IOException, SQLException {
		Object obj = null;

		Blob blobLocator = rs.getBlob(colName);
		if (blobLocator != null && blobLocator.length() != 0) {
			InputStream binaryInput = blobLocator.getBinaryStream();

			if (null != binaryInput) {
				if (binaryInput instanceof ByteArrayInputStream && ((ByteArrayInputStream) binaryInput).available() == 0) {
					//do nothing
				} else {
					ObjectInputStream in = new ObjectInputStream(binaryInput);
					try {
						obj = in.readObject();
					} finally {
						in.close();
					}
				}
			}

		}
		return obj;
	}

	protected String getSchedulerNameLiteral() {
		if (schedNameLiteral == null)
			schedNameLiteral = "'" + schedName + "'";
		return schedNameLiteral;
	}

	/**
	 * <p>
	 * Replace the table prefix in a query by replacing any occurrences of
	 * "{0}" with the table prefix.
	 * </p>
	 * 
	 * @param query
	 *          the unsubstitued query
	 * @return the query, with proper table prefix substituted
	 */
	protected final String rtp(String query) {
		return Util.rtp(query, tablePrefix, getSchedulerNameLiteral());
	}

	/**
	 * <p>
	 * Remove the transient data from and then create a serialized <code>java.util.ByteArrayOutputStream</code>
	 * version of a <code>{@link org.WorkDataMap.JobDataMap}</code>.
	 * </p>
	 * 
	 * @param data
	 *          the JobDataMap to serialize
	 * @return the serialized ByteArrayOutputStream
	 * @throws IOException
	 *           if serialization causes an error
	 */
	protected ByteArrayOutputStream serializeJobData(WorkDataMap data) throws IOException {
		if (canUseProperties()) {
			return serializeProperties(data);
		}

		try {
			return serializeObject(data);
		} catch (NotSerializableException e) {
			throw new NotSerializableException("Unable to serialize JobDataMap for insertion into " + "database because the value of property '" + getKeyOfNonSerializableValue(data) + "' is not serializable: " + e.getMessage());
		}
	}

	/**
	 * <p>
	 * Create a serialized <code>java.util.ByteArrayOutputStream</code>
	 * version of an Object.
	 * </p>
	 * 
	 * @param obj
	 *          the object to serialize
	 * @return the serialized ByteArrayOutputStream
	 * @throws IOException
	 *           if serialization causes an error
	 */
	protected ByteArrayOutputStream serializeObject(Object obj) throws IOException {
		ByteArrayOutputStream baos = new ByteArrayOutputStream();
		if (null != obj) {
			ObjectOutputStream out = new ObjectOutputStream(baos);
			out.writeObject(obj);
			out.flush();
		}
		return baos;
	}

	/**
	 * Sets the designated parameter to the given Java <code>boolean</code> value.
	 * This just wraps <code>{@link PreparedStatement#setBoolean(int, boolean)}</code>
	 * by default, but it can be overloaded by subclass delegates for databases that
	 * don't explicitly support the boolean type.
	 */
	protected void setBoolean(PreparedStatement ps, int index, boolean val) throws SQLException {
		ps.setBoolean(index, val);
	}

	/**
	 * Sets the designated parameter to the byte array of the given
	 * <code>ByteArrayOutputStream</code>.  Will set parameter value to null if the 
	 * <code>ByteArrayOutputStream</code> is null.
	 * This just wraps <code>{@link PreparedStatement#setBytes(int, byte[])}</code>
	 * by default, but it can be overloaded by subclass delegates for databases that
	 * don't explicitly support storing bytes in this way.
	 */
	protected void setBytes(PreparedStatement ps, int index, ByteArrayOutputStream baos) throws SQLException {
		ps.setBytes(index, (baos == null) ? new byte[0] : baos.toByteArray());
	}

	protected String toSqlLikeClause(final GroupMatcher matcher) {
		String groupName;
		switch (matcher.getCompareWithOperator()) {
		case EQUALS:
			groupName = matcher.getCompareToValue();
			break;
		case CONTAINS:
			groupName = "%" + matcher.getCompareToValue() + "%";
			break;
		case ENDS_WITH:
			groupName = "%" + matcher.getCompareToValue();
			break;
		case STARTS_WITH:
			groupName = matcher.getCompareToValue() + "%";
			break;
		default:
			throw new UnsupportedOperationException("Don't know how to translate " + matcher.getCompareWithOperator() + " into SQL");
		}
		return groupName;
	}

	public void addTriggerPersistenceDelegate(TriggerPersistenceDelegate delegate) {
		logger.debug("Adding TriggerPersistenceDelegate of type: " + delegate.getClass().getCanonicalName());
		delegate.initialize(tablePrefix, schedName);
		this.triggerPersistenceDelegates.add(delegate);
	}

	/**
	 * <p>
	 * Check whether or not a calendar exists.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @param calendarName
	 *          the name of the calendar
	 * @return true if the trigger exists, false otherwise
	 */
	@Override
	public boolean calendarExists(Connection conn, String calendarName) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;

		try {
			ps = conn.prepareStatement(rtp(SELECT_CALENDAR_EXISTENCE));
			ps.setString(1, calendarName);
			rs = ps.executeQuery();

			if (rs.next()) {
				return true;
			} else {
				return false;
			}
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Check whether or not a calendar is referenced by any triggers.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @param calendarName
	 *          the name of the calendar
	 * @return true if any triggers reference the calendar, false otherwise
	 */
	@Override
	public boolean calendarIsReferenced(Connection conn, String calendarName) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;
		try {
			ps = conn.prepareStatement(rtp(SELECT_REFERENCED_CALENDAR));
			ps.setString(1, calendarName);
			rs = ps.executeQuery();

			if (rs.next()) {
				return true;
			} else {
				return false;
			}
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	/**
	 * Clear (delete!) all scheduling data - all {@link Job}s, {@link Trigger}s
	 * {@link Calendar}s.
	 * 
	 * @throws JobPersistenceException
	 */
	@Override
	public void clearData(Connection conn) throws SQLException {

		PreparedStatement ps = null;

		try {
			ps = conn.prepareStatement(rtp(DELETE_ALL_SIMPLE_TRIGGERS));
			ps.executeUpdate();
			ps.close();
			ps = conn.prepareStatement(rtp(DELETE_ALL_SIMPROP_TRIGGERS));
			ps.executeUpdate();
			ps.close();
			ps = conn.prepareStatement(rtp(DELETE_ALL_CRON_TRIGGERS));
			ps.executeUpdate();
			ps.close();
			ps = conn.prepareStatement(rtp(DELETE_ALL_BLOB_TRIGGERS));
			ps.executeUpdate();
			ps.close();
			ps = conn.prepareStatement(rtp(DELETE_ALL_TRIGGERS));
			ps.executeUpdate();
			ps.close();
			ps = conn.prepareStatement(rtp(DELETE_ALL_JOB_DETAILS));
			ps.executeUpdate();
			ps.close();
			ps = conn.prepareStatement(rtp(DELETE_ALL_CALENDARS));
			ps.executeUpdate();
			ps.close();
			ps = conn.prepareStatement(rtp(DELETE_ALL_PAUSED_TRIGGER_GRPS));
			ps.executeUpdate();
		} finally {
			closeStatement(ps);
		}
	}

	//---------------------------------------------------------------------------
	// triggers
	//---------------------------------------------------------------------------

	/**
	 * <p>
	 * Get the number of triggers in the given states that have
	 * misfired - according to the given timestamp.
	 * </p>
	 * 
	 * @param conn the DB Connection
	 */
	@Override
	public int countMisfiredTriggersInState(Connection conn, String state1, long ts) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;

		try {
			ps = conn.prepareStatement(rtp(COUNT_MISFIRED_TRIGGERS_IN_STATE));
			ps.setBigDecimal(1, new BigDecimal(String.valueOf(ts)));
			ps.setString(2, state1);
			rs = ps.executeQuery();

			if (rs.next()) {
				return rs.getInt(1);
			}

			throw new SQLException("No misfired trigger count returned.");
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	@Override
	public int deleteAllPausedTriggerGroups(Connection conn) throws SQLException {
		PreparedStatement ps = null;

		try {
			ps = conn.prepareStatement(rtp(DELETE_PAUSED_TRIGGER_GROUPS));
			int rows = ps.executeUpdate();

			return rows;
		} finally {
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Delete the cron trigger data for a trigger.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @return the number of rows deleted
	 */
	public int deleteBlobTrigger(Connection conn, TriggerKey triggerKey) throws SQLException {
		PreparedStatement ps = null;

		try {
			ps = conn.prepareStatement(rtp(DELETE_BLOB_TRIGGER));
			ps.setString(1, triggerKey.getName());
			ps.setString(2, triggerKey.getGroup());

			return ps.executeUpdate();
		} finally {
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Delete a calendar.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @param calendarName
	 *          the name of the trigger
	 * @return the number of rows deleted
	 */
	@Override
	public int deleteCalendar(Connection conn, String calendarName) throws SQLException {
		PreparedStatement ps = null;

		try {
			ps = conn.prepareStatement(rtp(DELETE_CALENDAR));
			ps.setString(1, calendarName);

			return ps.executeUpdate();
		} finally {
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Delete a fired trigger.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @param entryId
	 *          the fired trigger entry to delete
	 * @return the number of rows deleted
	 */
	@Override
	public int deleteFiredTrigger(Connection conn, String entryId) throws SQLException {
		PreparedStatement ps = null;
		try {
			ps = conn.prepareStatement(rtp(DELETE_FIRED_TRIGGER));
			ps.setString(1, entryId);

			return ps.executeUpdate();
		} finally {
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Delete all fired triggers.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @return the number of rows deleted
	 */
	@Override
	public int deleteFiredTriggers(Connection conn) throws SQLException {
		PreparedStatement ps = null;

		try {
			ps = conn.prepareStatement(rtp(DELETE_FIRED_TRIGGERS));

			return ps.executeUpdate();
		} finally {
			closeStatement(ps);
		}
	}

	@Override
	public int deleteFiredTriggers(Connection conn, String instanceId) throws SQLException {
		PreparedStatement ps = null;

		try {
			ps = conn.prepareStatement(rtp(DELETE_INSTANCES_FIRED_TRIGGERS));
			ps.setString(1, instanceId);

			return ps.executeUpdate();
		} finally {
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Delete the job detail record for the given job.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @return the number of rows deleted
	 */
	@Override
	public int deleteJobDetail(Connection conn, WorkKey jobKey) throws SQLException {
		PreparedStatement ps = null;

		try {
			if (logger.isDebugEnabled()) {
				logger.debug("Deleting job: " + jobKey);
			}
			ps = conn.prepareStatement(rtp(DELETE_JOB_DETAIL));
			ps.setString(1, jobKey.getName());
			ps.setString(2, jobKey.getGroup());
			return ps.executeUpdate();
		} finally {
			closeStatement(ps);
		}
	}

	@Override
	public int deletePausedTriggerGroup(Connection conn, GroupMatcher<TriggerKey> matcher) throws SQLException {
		PreparedStatement ps = null;

		try {
			ps = conn.prepareStatement(rtp(DELETE_PAUSED_TRIGGER_GROUP));
			ps.setString(1, toSqlLikeClause(matcher));
			int rows = ps.executeUpdate();

			return rows;
		} finally {
			closeStatement(ps);
		}
	}

	@Override
	public int deletePausedTriggerGroup(Connection conn, String groupName) throws SQLException {
		PreparedStatement ps = null;

		try {
			ps = conn.prepareStatement(rtp(DELETE_PAUSED_TRIGGER_GROUP));
			ps.setString(1, groupName);
			int rows = ps.executeUpdate();

			return rows;
		} finally {
			closeStatement(ps);
		}
	}

	@Override
	public int deleteSchedulerState(Connection conn, String instanceId) throws SQLException {
		PreparedStatement ps = null;
		try {
			ps = conn.prepareStatement(rtp(DELETE_SCHEDULER_STATE));
			ps.setString(1, instanceId);

			return ps.executeUpdate();
		} finally {
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Delete the base trigger data for a trigger.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @return the number of rows deleted
	 */
	@Override
	public int deleteTrigger(Connection conn, TriggerKey triggerKey) throws SQLException {
		PreparedStatement ps = null;

		deleteTriggerExtension(conn, triggerKey);

		try {
			ps = conn.prepareStatement(rtp(DELETE_TRIGGER));
			ps.setString(1, triggerKey.getName());
			ps.setString(2, triggerKey.getGroup());

			return ps.executeUpdate();
		} finally {
			closeStatement(ps);
		}
	}

	public TriggerPersistenceDelegate findTriggerPersistenceDelegate(String discriminator) {
		for (TriggerPersistenceDelegate delegate : triggerPersistenceDelegates) {
			if (delegate.getHandledTriggerTypeDiscriminator().equals(discriminator))
				return delegate;
		}

		return null;
	}

	public TriggerPersistenceDelegate findTriggerPersistenceDelegate(Trigger trigger) {
		for (TriggerPersistenceDelegate delegate : triggerPersistenceDelegates) {
			if (delegate.canHandleTriggerType(trigger))
				return delegate;
		}

		return null;
	}

	/**
	 * <p>
	 * Get the names of all of the triggers in the given state that have
	 * misfired - according to the given timestamp.  No more than count will
	 * be returned.
	 * </p>
	 * 
	 * @param conn The DB Connection
	 * @param count The most misfired triggers to return, negative for all
	 * @param resultList Output parameter.  A List of 
	 *      <code>{@link org.quartz.utils.Key}</code> objects.  Must not be null.
	 *          
	 * @return Whether there are more misfired triggers left to find beyond
	 *         the given count.
	 */
	@Override
	public boolean hasMisfiredTriggersInState(Connection conn, String state1, long ts, int count, List<TriggerKey> resultList) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;

		try {
			ps = conn.prepareStatement(rtp(SELECT_HAS_MISFIRED_TRIGGERS_IN_STATE));
			ps.setBigDecimal(1, new BigDecimal(String.valueOf(ts)));
			ps.setString(2, state1);
			rs = ps.executeQuery();

			boolean hasReachedLimit = false;
			while (rs.next() && (hasReachedLimit == false)) {
				if (resultList.size() == count) {
					hasReachedLimit = true;
				} else {
					String triggerName = rs.getString(COL_TRIGGER_NAME);
					String groupName = rs.getString(COL_TRIGGER_GROUP);
					resultList.add(triggerKey(triggerName, groupName));
				}
			}

			return hasReachedLimit;
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	/**
	 * initStrings are of the format:
	 * 
	 * settingName=settingValue|otherSettingName=otherSettingValue|...
	 * @throws NoSuchDelegateException 
	 */
	@Override
	public void initialize(String initString) throws NoSuchDelegateException {
		if (initString == null)
			return;

		String[] settings = initString.split("\\|");

		for (String setting : settings) {
			String[] parts = setting.split("=");
			String name = parts[0];
			if (parts.length == 1 || parts[1] == null || parts[1].equals(""))
				continue;

			if (name.equals("triggerPersistenceDelegateClasses")) {

				String[] trigDelegates = parts[1].split(",");

				for (String trigDelClassName : trigDelegates) {
					try {
						Class trigDelClass = classLoadHelper.loadClass(trigDelClassName);
						addTriggerPersistenceDelegate((TriggerPersistenceDelegate) trigDelClass.newInstance());
					} catch (Exception e) {
						throw new NoSuchDelegateException("Error instantiating TriggerPersistenceDelegate of type: " + trigDelClassName, e);
					}
				}
			} else
				throw new NoSuchDelegateException("Unknown setting: '" + name + "'");
		}
	}

	/**
	 * <p>
	 * Insert the blob trigger data.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @param trigger
	 *          the trigger to insert
	 * @return the number of rows inserted
	 */
	public int insertBlobTrigger(Connection conn, Trigger trigger) throws SQLException, IOException {
		PreparedStatement ps = null;
		ByteArrayOutputStream os = null;

		try {
			// update the blob
			os = new ByteArrayOutputStream();
			ObjectOutputStream oos = new ObjectOutputStream(os);
			oos.writeObject(trigger);
			oos.close();

			byte[] buf = os.toByteArray();
			ByteArrayInputStream is = new ByteArrayInputStream(buf);

			ps = conn.prepareStatement(rtp(INSERT_BLOB_TRIGGER));
			ps.setString(1, trigger.getKey().getName());
			ps.setString(2, trigger.getKey().getGroup());
			ps.setBinaryStream(3, is, buf.length);

			return ps.executeUpdate();
		} finally {
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Insert a new calendar.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @param calendarName
	 *          the name for the new calendar
	 * @param calendar
	 *          the calendar
	 * @return the number of rows inserted
	 * @throws IOException
	 *           if there were problems serializing the calendar
	 */
	@Override
	public int insertCalendar(Connection conn, String calendarName, Calendar calendar) throws IOException, SQLException {
		ByteArrayOutputStream baos = serializeObject(calendar);

		PreparedStatement ps = null;

		try {
			ps = conn.prepareStatement(rtp(INSERT_CALENDAR));
			ps.setString(1, calendarName);
			setBytes(ps, 2, baos);

			return ps.executeUpdate();
		} finally {
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Insert a fired trigger.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @param trigger
	 *          the trigger
	 * @param state
	 *          the state that the trigger should be stored in
	 * @return the number of rows inserted
	 */
	@Override
	public int insertFiredTrigger(Connection conn, Trigger trigger, String state, Work job) throws SQLException {
		PreparedStatement ps = null;
		try {
			ps = conn.prepareStatement(rtp(INSERT_FIRED_TRIGGER));
			ps.setString(1, trigger.getFireInstanceId());
			ps.setString(2, trigger.getKey().getName());
			ps.setString(3, trigger.getKey().getGroup());
			ps.setString(4, instanceId);
			ps.setBigDecimal(5, new BigDecimal(String.valueOf(trigger.getNextFireTime().getTime())));
			ps.setString(6, state);
			if (job != null) {
				ps.setString(7, trigger.getJobKey().getName());
				ps.setString(8, trigger.getJobKey().getGroup());
				setBoolean(ps, 9, job.isConcurrentExectionDisallowed());
				//setBoolean(ps, 10, job.requestsRecovery());
			} else {
				ps.setString(7, null);
				ps.setString(8, null);
				setBoolean(ps, 9, false);
				setBoolean(ps, 10, false);
			}
			ps.setInt(11, trigger.getPriority());

			return ps.executeUpdate();
		} finally {
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Insert the job detail record.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @param job
	 *          the job to insert
	 * @return number of rows inserted
	 * @throws IOException
	 *           if there were problems serializing the JobDataMap
	 */
	@Override
	public int insertJobDetail(Connection conn, Work job) throws IOException, SQLException {
		ByteArrayOutputStream baos = null;
		//serializeJobData(job.get);

		PreparedStatement ps = null;

		int insertResult = 0;

		try {
			ps = conn.prepareStatement(rtp(INSERT_JOB_DETAIL));
			ps.setString(1, job.getKey().getName());
			ps.setString(2, job.getKey().getGroup());
			ps.setString(3, job.getDescription());
			ps.setString(4, job.getWorkClass().getName());
			//setBoolean(ps, 5, job.isDurable());
			setBoolean(ps, 6, job.isConcurrentExectionDisallowed());
			//setBoolean(ps, 7, job.isPersistJobDataAfterExecution());
			//setBoolean(ps, 8, job.requestsRecovery());
			setBytes(ps, 9, baos);

			insertResult = ps.executeUpdate();
		} finally {
			closeStatement(ps);
		}

		return insertResult;
	}

	@Override
	public int insertPausedTriggerGroup(Connection conn, String groupName) throws SQLException {
		PreparedStatement ps = null;

		try {
			ps = conn.prepareStatement(rtp(INSERT_PAUSED_TRIGGER_GROUP));
			ps.setString(1, groupName);
			int rows = ps.executeUpdate();

			return rows;
		} finally {
			closeStatement(ps);
		}
	}

	@Override
	public int insertSchedulerState(Connection conn, String instanceId, long checkInTime, long interval) throws SQLException {
		PreparedStatement ps = null;
		try {
			ps = conn.prepareStatement(rtp(INSERT_SCHEDULER_STATE));
			ps.setString(1, instanceId);
			ps.setLong(2, checkInTime);
			ps.setLong(3, interval);

			return ps.executeUpdate();
		} finally {
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Insert the base trigger data.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @param trigger
	 *          the trigger to insert
	 * @param state
	 *          the state that the trigger should be stored in
	 * @return the number of rows inserted
	 */
	@Override
	public int insertTrigger(Connection conn, Trigger trigger, String state, Work jobDetail) throws SQLException, IOException {

		ByteArrayOutputStream baos = null;
		if (trigger.getJobDataMap().size() > 0) {
			baos = serializeJobData(trigger.getJobDataMap());
		}

		PreparedStatement ps = null;

		int insertResult = 0;

		try {
			ps = conn.prepareStatement(rtp(INSERT_TRIGGER));
			ps.setString(1, trigger.getKey().getName());
			ps.setString(2, trigger.getKey().getGroup());
			ps.setString(3, trigger.getJobKey().getName());
			ps.setString(4, trigger.getJobKey().getGroup());
			ps.setString(5, trigger.getDescription());
			if (trigger.getNextFireTime() != null)
				ps.setBigDecimal(6, new BigDecimal(String.valueOf(trigger.getNextFireTime().getTime())));
			else
				ps.setBigDecimal(6, null);
			long prevFireTime = -1;
			if (trigger.getPreviousFireTime() != null) {
				prevFireTime = trigger.getPreviousFireTime().getTime();
			}
			ps.setBigDecimal(7, new BigDecimal(String.valueOf(prevFireTime)));
			ps.setString(8, state);

			TriggerPersistenceDelegate tDel = findTriggerPersistenceDelegate(trigger);

			String type = TTYPE_BLOB;
			if (tDel != null)
				type = tDel.getHandledTriggerTypeDiscriminator();
			ps.setString(9, type);

			ps.setBigDecimal(10, new BigDecimal(String.valueOf(trigger.getStartTime().getTime())));
			long endTime = 0;
			if (trigger.getEndTime() != null) {
				endTime = trigger.getEndTime().getTime();
			}
			ps.setBigDecimal(11, new BigDecimal(String.valueOf(endTime)));
			ps.setString(12, trigger.getCalendarName());
			ps.setInt(13, trigger.getMisfireInstruction());
			setBytes(ps, 14, baos);
			ps.setInt(15, trigger.getPriority());

			insertResult = ps.executeUpdate();

			if (tDel == null)
				insertBlobTrigger(conn, trigger);
			else
				tDel.insertExtendedTriggerProperties(conn, trigger, state, jobDetail);

		} finally {
			closeStatement(ps);
		}

		return insertResult;
	}

	@Override
	public boolean isExistingTriggerGroup(Connection conn, String groupName) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;

		try {
			ps = conn.prepareStatement(rtp(SELECT_NUM_TRIGGERS_IN_GROUP));
			ps.setString(1, groupName);
			rs = ps.executeQuery();

			if (!rs.next()) {
				return false;
			}

			return (rs.getInt(1) > 0);
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Check whether or not the given job is stateful.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @return true if the job exists and is stateful, false otherwise
	 */
	@Override
	public boolean isJobNonConcurrent(Connection conn, WorkKey jobKey) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;

		try {
			ps = conn.prepareStatement(rtp(SELECT_JOB_NONCONCURRENT));
			ps.setString(1, jobKey.getName());
			ps.setString(2, jobKey.getGroup());
			rs = ps.executeQuery();
			if (!rs.next()) {
				return false;
			}
			return getBoolean(rs, COL_IS_NONCONCURRENT);
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	@Override
	public boolean isTriggerGroupPaused(Connection conn, String groupName) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;

		try {
			ps = conn.prepareStatement(rtp(SELECT_PAUSED_TRIGGER_GROUP));
			ps.setString(1, groupName);
			rs = ps.executeQuery();

			return rs.next();
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Check whether or not the given job exists.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @return true if the job exists, false otherwise
	 */
	@Override
	public boolean jobExists(Connection conn, WorkKey jobKey) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;

		try {
			ps = conn.prepareStatement(rtp(SELECT_JOB_EXISTENCE));
			ps.setString(1, jobKey.getName());
			ps.setString(2, jobKey.getGroup());
			rs = ps.executeQuery();
			if (rs.next()) {
				return true;
			} else {
				return false;
			}
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}

	}

	/**
	 * <p>
	 * Select a calendar.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @param calendarName
	 *          the name of the calendar
	 * @return the Calendar
	 * @throws ClassNotFoundException
	 *           if a class found during deserialization cannot be found be
	 *           found
	 * @throws IOException
	 *           if there were problems deserializing the calendar
	 */
	@Override
	public Calendar selectCalendar(Connection conn, String calendarName) throws ClassNotFoundException, IOException, SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;
		try {
			String selCal = rtp(SELECT_CALENDAR);
			ps = conn.prepareStatement(selCal);
			ps.setString(1, calendarName);
			rs = ps.executeQuery();

			Calendar cal = null;
			if (rs.next()) {
				cal = (Calendar) getObjectFromBlob(rs, COL_CALENDAR);
			}
			if (null == cal) {
				logger.warn("Couldn't find calendar with name '" + calendarName + "'.");
			}
			return cal;
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Select all of the stored calendars.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @return an array of <code>String</code> calendar names
	 */
	@Override
	public List<String> selectCalendars(Connection conn) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;

		try {
			ps = conn.prepareStatement(rtp(SELECT_CALENDARS));
			rs = ps.executeQuery();

			LinkedList<String> list = new LinkedList<String>();
			while (rs.next()) {
				list.add(rs.getString(1));
			}

			return list;
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Select the distinct instance names of all fired-trigger records.
	 * </p>
	 * 
	 * <p>
	 * This is useful when trying to identify orphaned fired triggers (a 
	 * fired trigger without a scheduler state record.) 
	 * </p>
	 * 
	 * @return a Set of String objects.
	 */
	@Override
	public Set<String> selectFiredTriggerInstanceNames(Connection conn) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;
		try {
			Set<String> instanceNames = new HashSet<String>();

			ps = conn.prepareStatement(rtp(SELECT_FIRED_TRIGGER_INSTANCE_NAMES));
			rs = ps.executeQuery();

			while (rs.next()) {
				instanceNames.add(rs.getString(COL_INSTANCE_NAME));
			}

			return instanceNames;
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Select the states of all fired-trigger records for a given trigger, or
	 * trigger group if trigger name is <code>null</code>.
	 * </p>
	 * 
	 * @return a List of FiredTriggerRecord objects.
	 */
	@Override
	public List<FiredTriggerRecord> selectFiredTriggerRecords(Connection conn, String triggerName, String groupName) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;
		try {
			List<FiredTriggerRecord> lst = new LinkedList<FiredTriggerRecord>();

			if (triggerName != null) {
				ps = conn.prepareStatement(rtp(SELECT_FIRED_TRIGGER));
				ps.setString(1, triggerName);
				ps.setString(2, groupName);
			} else {
				ps = conn.prepareStatement(rtp(SELECT_FIRED_TRIGGER_GROUP));
				ps.setString(1, groupName);
			}
			rs = ps.executeQuery();

			while (rs.next()) {
				FiredTriggerRecord rec = new FiredTriggerRecord();

				rec.setFireInstanceId(rs.getString(COL_ENTRY_ID));
				rec.setFireInstanceState(rs.getString(COL_ENTRY_STATE));
				rec.setFireTimestamp(rs.getLong(COL_FIRED_TIME));
				rec.setPriority(rs.getInt(COL_PRIORITY));
				rec.setSchedulerInstanceId(rs.getString(COL_INSTANCE_NAME));
				rec.setTriggerKey(triggerKey(rs.getString(COL_TRIGGER_NAME), rs.getString(COL_TRIGGER_GROUP)));
				if (!rec.getFireInstanceState().equals(STATE_ACQUIRED)) {
					rec.setJobDisallowsConcurrentExecution(getBoolean(rs, COL_IS_NONCONCURRENT));
					rec.setJobRequestsRecovery(rs.getBoolean(COL_REQUESTS_RECOVERY));
					rec.setJobKey(jobKey(rs.getString(COL_JOB_NAME), rs.getString(COL_JOB_GROUP)));
				}
				lst.add(rec);
			}

			return lst;
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Select the states of all fired-trigger records for a given job, or job
	 * group if job name is <code>null</code>.
	 * </p>
	 * 
	 * @return a List of FiredTriggerRecord objects.
	 */
	@Override
	public List<FiredTriggerRecord> selectFiredTriggerRecordsByJob(Connection conn, String jobName, String groupName) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;
		try {
			List<FiredTriggerRecord> lst = new LinkedList<FiredTriggerRecord>();

			if (jobName != null) {
				ps = conn.prepareStatement(rtp(SELECT_FIRED_TRIGGERS_OF_JOB));
				ps.setString(1, jobName);
				ps.setString(2, groupName);
			} else {
				ps = conn.prepareStatement(rtp(SELECT_FIRED_TRIGGERS_OF_JOB_GROUP));
				ps.setString(1, groupName);
			}
			rs = ps.executeQuery();

			while (rs.next()) {
				FiredTriggerRecord rec = new FiredTriggerRecord();

				rec.setFireInstanceId(rs.getString(COL_ENTRY_ID));
				rec.setFireInstanceState(rs.getString(COL_ENTRY_STATE));
				rec.setFireTimestamp(rs.getLong(COL_FIRED_TIME));
				rec.setPriority(rs.getInt(COL_PRIORITY));
				rec.setSchedulerInstanceId(rs.getString(COL_INSTANCE_NAME));
				rec.setTriggerKey(triggerKey(rs.getString(COL_TRIGGER_NAME), rs.getString(COL_TRIGGER_GROUP)));
				if (!rec.getFireInstanceState().equals(STATE_ACQUIRED)) {
					rec.setJobDisallowsConcurrentExecution(getBoolean(rs, COL_IS_NONCONCURRENT));
					rec.setJobRequestsRecovery(rs.getBoolean(COL_REQUESTS_RECOVERY));
					rec.setJobKey(jobKey(rs.getString(COL_JOB_NAME), rs.getString(COL_JOB_GROUP)));
				}
				lst.add(rec);
			}

			return lst;
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}

	}

	@Override
	public List<FiredTriggerRecord> selectInstancesFiredTriggerRecords(Connection conn, String instanceName) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;
		try {
			List<FiredTriggerRecord> lst = new LinkedList<FiredTriggerRecord>();

			ps = conn.prepareStatement(rtp(SELECT_INSTANCES_FIRED_TRIGGERS));
			ps.setString(1, instanceName);
			rs = ps.executeQuery();

			while (rs.next()) {
				FiredTriggerRecord rec = new FiredTriggerRecord();

				rec.setFireInstanceId(rs.getString(COL_ENTRY_ID));
				rec.setFireInstanceState(rs.getString(COL_ENTRY_STATE));
				rec.setFireTimestamp(rs.getLong(COL_FIRED_TIME));
				rec.setSchedulerInstanceId(rs.getString(COL_INSTANCE_NAME));
				rec.setTriggerKey(triggerKey(rs.getString(COL_TRIGGER_NAME), rs.getString(COL_TRIGGER_GROUP)));
				if (!rec.getFireInstanceState().equals(STATE_ACQUIRED)) {
					rec.setJobDisallowsConcurrentExecution(getBoolean(rs, COL_IS_NONCONCURRENT));
					rec.setJobRequestsRecovery(rs.getBoolean(COL_REQUESTS_RECOVERY));
					rec.setJobKey(jobKey(rs.getString(COL_JOB_NAME), rs.getString(COL_JOB_GROUP)));
				}
				rec.setPriority(rs.getInt(COL_PRIORITY));
				lst.add(rec);
			}

			return lst;
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Select the JobDetail object for a given job name / group name.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @return the populated JobDetail object
	 * @throws ClassNotFoundException
	 *           if a class found during deserialization cannot be found or if
	 *           the job class could not be found
	 * @throws IOException
	 *           if deserialization causes an error
	 */
	@Override
	public Work selectJobDetail(Connection conn, WorkKey jobKey, ClassLoadHelper loadHelper) throws ClassNotFoundException, IOException, SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;

		try {
			ps = conn.prepareStatement(rtp(SELECT_JOB_DETAIL));
			ps.setString(1, jobKey.getName());
			ps.setString(2, jobKey.getGroup());
			rs = ps.executeQuery();

			Work job = null;

			if (rs.next()) {
				job = null;
				//new Work((Go) loadHelper.loadClass(rs.getString(COL_JOB_CLASS)).newInstance());
				//job.setId(rs.getString(COL_JOB_NAME));
				//	job.setGroup(rs.getString(COL_JOB_GROUP));
				job.setDescription(rs.getString(COL_DESCRIPTION));
				//job.setJobClass();
				//job.setDurability(getBoolean(rs, COL_IS_DURABLE));
				//job.setRequestsRecovery(getBoolean(rs, COL_REQUESTS_RECOVERY));

				Map<?, ?> map = null;
				if (canUseProperties()) {
					map = getMapFromProperties(rs);
				} else {
					map = (Map<?, ?>) getObjectFromBlob(rs, COL_JOB_DATAMAP);
				}

				if (null != map) {
					//job.setJobDataMap(new WorkDataMap(map));
				}
			}

			return job;
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	@Override
	public int selectJobExecutionCount(Connection conn, WorkKey jobKey) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;

		try {
			ps = conn.prepareStatement(rtp(SELECT_JOB_EXECUTION_COUNT));
			ps.setString(1, jobKey.getName());
			ps.setString(2, jobKey.getGroup());

			rs = ps.executeQuery();

			return (rs.next()) ? rs.getInt(1) : 0;
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Select the job to which the trigger is associated.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @return the <code>{@link org.quartz.JobDetail}</code> object
	 *         associated with the given trigger
	 * @throws SQLException
	 * @throws ClassNotFoundException
	 */
	@Override
	public Work selectJobForTrigger(Connection conn, ClassLoadHelper loadHelper, TriggerKey triggerKey) throws ClassNotFoundException, SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;

		try {
			ps = conn.prepareStatement(rtp(SELECT_JOB_FOR_TRIGGER));
			ps.setString(1, triggerKey.getName());
			ps.setString(2, triggerKey.getGroup());
			rs = ps.executeQuery();

			if (rs.next()) {
				Work job = null;
				/*new Work((Go) loadHelper.loadClass(rs.getString(4)).newInstance());
				job.setName(rs.getString(1));
				job.setGroup(rs.getString(2));
				job.setDurability(getBoolean(rs, 3));
				job.setRequestsRecovery(getBoolean(rs, 5));
				*/
				return job;
			} else {
				if (logger.isDebugEnabled()) {
					logger.debug("No job for trigger '" + triggerKey + "'.");
				}
				return null;
			}
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	//---------------------------------------------------------------------------
	// calendars
	//---------------------------------------------------------------------------

	/**
	 * <p>
	 * Select all of the job group names that are stored.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @return an array of <code>String</code> group names
	 */
	@Override
	public List<String> selectJobGroups(Connection conn) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;

		try {
			ps = conn.prepareStatement(rtp(SELECT_JOB_GROUPS));
			rs = ps.executeQuery();

			LinkedList<String> list = new LinkedList<String>();
			while (rs.next()) {
				list.add(rs.getString(1));
			}

			return list;
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Select all of the jobs contained in a given group.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @param matcher
	 *          the groupMatcher to evaluate the jobs against
	 * @return an array of <code>String</code> job names
	 */
	@Override
	public Set<WorkKey> selectJobsInGroup(Connection conn, GroupMatcher<WorkKey> matcher) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;

		try {
			ps = conn.prepareStatement(rtp(SELECT_JOBS_IN_GROUP));
			ps.setString(1, toSqlLikeClause(matcher));
			rs = ps.executeQuery();

			LinkedList<WorkKey> list = new LinkedList<WorkKey>();
			while (rs.next()) {
				list.add(jobKey(rs.getString(1), rs.getString(2)));
			}

			return new HashSet<WorkKey>(list);
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Get the names of all of the triggers that have misfired.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @return an array of <code>{@link
	 * org.quartz.utils.Key}</code> objects
	 */
	@Override
	public List<TriggerKey> selectMisfiredTriggers(Connection conn, long ts) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;

		try {
			ps = conn.prepareStatement(rtp(SELECT_MISFIRED_TRIGGERS));
			ps.setBigDecimal(1, new BigDecimal(String.valueOf(ts)));
			rs = ps.executeQuery();

			LinkedList<TriggerKey> list = new LinkedList<TriggerKey>();
			while (rs.next()) {
				String triggerName = rs.getString(COL_TRIGGER_NAME);
				String groupName = rs.getString(COL_TRIGGER_GROUP);
				list.add(triggerKey(triggerName, groupName));
			}
			return list;
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Get the names of all of the triggers in the given group and state that
	 * have misfired.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @return an array of <code>{@link
	 * org.quartz.utils.Key}</code> objects
	 */
	@Override
	public List<TriggerKey> selectMisfiredTriggersInGroupInState(Connection conn, String groupName, String state, long ts) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;

		try {
			ps = conn.prepareStatement(rtp(SELECT_MISFIRED_TRIGGERS_IN_GROUP_IN_STATE));
			ps.setBigDecimal(1, new BigDecimal(String.valueOf(ts)));
			ps.setString(2, groupName);
			ps.setString(3, state);
			rs = ps.executeQuery();

			LinkedList<TriggerKey> list = new LinkedList<TriggerKey>();
			while (rs.next()) {
				String triggerName = rs.getString(COL_TRIGGER_NAME);
				list.add(triggerKey(triggerName, groupName));
			}
			return list;
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	@Override
	public List<TriggerKey> selectMisfiredTriggersInState(Connection conn, String state, long ts) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;

		try {
			ps = conn.prepareStatement(rtp(SELECT_MISFIRED_TRIGGERS_IN_STATE));
			ps.setBigDecimal(1, new BigDecimal(String.valueOf(ts)));
			ps.setString(2, state);
			rs = ps.executeQuery();

			LinkedList<TriggerKey> list = new LinkedList<TriggerKey>();
			while (rs.next()) {
				String triggerName = rs.getString(COL_TRIGGER_NAME);
				String groupName = rs.getString(COL_TRIGGER_GROUP);
				list.add(triggerKey(triggerName, groupName));
			}
			return list;
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Select the next time that a trigger will be fired.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @return the next fire time, or 0 if no trigger will be fired
	 * 
	 * @deprecated Does not account for misfires.
	 */
	@Deprecated
	public long selectNextFireTime(Connection conn) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;
		try {
			ps = conn.prepareStatement(rtp(SELECT_NEXT_FIRE_TIME));
			ps.setString(1, STATE_WAITING);
			rs = ps.executeQuery();

			if (rs.next()) {
				return rs.getLong(ALIAS_COL_NEXT_FIRE_TIME);
			} else {
				return 0l;
			}
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Select the total number of calendars stored.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @return the total number of calendars stored
	 */
	@Override
	public int selectNumCalendars(Connection conn) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;

		try {
			int count = 0;
			ps = conn.prepareStatement(rtp(SELECT_NUM_CALENDARS));

			rs = ps.executeQuery();

			if (rs.next()) {
				count = rs.getInt(1);
			}

			return count;
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Select the total number of jobs stored.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @return the total number of jobs stored
	 */
	@Override
	public int selectNumJobs(Connection conn) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;

		try {
			int count = 0;
			ps = conn.prepareStatement(rtp(SELECT_NUM_JOBS));
			rs = ps.executeQuery();

			if (rs.next()) {
				count = rs.getInt(1);
			}

			return count;
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Select the total number of triggers stored.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @return the total number of triggers stored
	 */
	@Override
	public int selectNumTriggers(Connection conn) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;

		try {
			int count = 0;
			ps = conn.prepareStatement(rtp(SELECT_NUM_TRIGGERS));
			rs = ps.executeQuery();

			if (rs.next()) {
				count = rs.getInt(1);
			}

			return count;
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Select the number of triggers associated with a given job.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @return the number of triggers for the given job
	 */
	@Override
	public int selectNumTriggersForJob(Connection conn, WorkKey jobKey) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;

		try {
			ps = conn.prepareStatement(rtp(SELECT_NUM_TRIGGERS_FOR_JOB));
			ps.setString(1, jobKey.getName());
			ps.setString(2, jobKey.getGroup());
			rs = ps.executeQuery();

			if (rs.next()) {
				return rs.getInt(1);
			} else {
				return 0;
			}
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	/** 
	 * @see org.quartz.impl.jdbcjobstore.DriverDelegate#selectPausedTriggerGroups(java.sql.Connection)
	 */
	@Override
	public Set<String> selectPausedTriggerGroups(Connection conn) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;

		HashSet<String> set = new HashSet<String>();
		try {
			ps = conn.prepareStatement(rtp(SELECT_PAUSED_TRIGGER_GROUPS));
			rs = ps.executeQuery();

			while (rs.next()) {
				String groupName = rs.getString(COL_TRIGGER_GROUP);
				set.add(groupName);
			}
			return set;
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	@Override
	public List<SchedulerStateRecord> selectSchedulerStateRecords(Connection conn, String instanceId) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;
		try {
			List<SchedulerStateRecord> lst = new LinkedList<SchedulerStateRecord>();

			if (instanceId != null) {
				ps = conn.prepareStatement(rtp(SELECT_SCHEDULER_STATE));
				ps.setString(1, instanceId);
			} else {
				ps = conn.prepareStatement(rtp(SELECT_SCHEDULER_STATES));
			}
			rs = ps.executeQuery();

			while (rs.next()) {
				SchedulerStateRecord rec = new SchedulerStateRecord();

				rec.setSchedulerInstanceId(rs.getString(COL_INSTANCE_NAME));
				rec.setCheckinTimestamp(rs.getLong(COL_LAST_CHECKIN_TIME));
				rec.setCheckinInterval(rs.getLong(COL_CHECKIN_INTERVAL));

				lst.add(rec);
			}

			return lst;
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}

	}

	/**
	 * <p>
	 * Select a trigger.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @return the <code>{@link org.quartz.Trigger}</code> object
	 * @throws JobPersistenceException 
	 */
	@Override
	public Trigger selectTrigger(Connection conn, TriggerKey triggerKey) throws SQLException, ClassNotFoundException, IOException, JobPersistenceException {
		PreparedStatement ps = null;
		ResultSet rs = null;

		try {
			Trigger trigger = null;

			ps = conn.prepareStatement(rtp(SELECT_TRIGGER));
			ps.setString(1, triggerKey.getName());
			ps.setString(2, triggerKey.getGroup());
			rs = ps.executeQuery();

			if (rs.next()) {
				String jobName = rs.getString(COL_JOB_NAME);
				String jobGroup = rs.getString(COL_JOB_GROUP);
				String description = rs.getString(COL_DESCRIPTION);
				long nextFireTime = rs.getLong(COL_NEXT_FIRE_TIME);
				long prevFireTime = rs.getLong(COL_PREV_FIRE_TIME);
				String triggerType = rs.getString(COL_TRIGGER_TYPE);
				long startTime = rs.getLong(COL_START_TIME);
				long endTime = rs.getLong(COL_END_TIME);
				String calendarName = rs.getString(COL_CALENDAR_NAME);
				int misFireInstr = rs.getInt(COL_MISFIRE_INSTRUCTION);
				int priority = rs.getInt(COL_PRIORITY);

				Map<?, ?> map = null;
				if (canUseProperties()) {
					map = getMapFromProperties(rs);
				} else {
					map = (Map<?, ?>) getObjectFromBlob(rs, COL_JOB_DATAMAP);
				}

				Date nft = null;
				if (nextFireTime > 0) {
					nft = new Date(nextFireTime);
				}

				Date pft = null;
				if (prevFireTime > 0) {
					pft = new Date(prevFireTime);
				}
				Date startTimeD = new Date(startTime);
				Date endTimeD = null;
				if (endTime > 0) {
					endTimeD = new Date(endTime);
				}

				rs.close();
				rs = null;
				ps.close();
				rs = null;

				if (triggerType.equals(TTYPE_BLOB)) {
					ps = conn.prepareStatement(rtp(SELECT_BLOB_TRIGGER));
					ps.setString(1, triggerKey.getName());
					ps.setString(2, triggerKey.getGroup());
					rs = ps.executeQuery();

					if (rs.next()) {
						trigger = (Trigger) getObjectFromBlob(rs, COL_BLOB);
					}
				} else {
					TriggerPersistenceDelegate tDel = findTriggerPersistenceDelegate(triggerType);

					if (tDel == null)
						throw new JobPersistenceException("No TriggerPersistenceDelegate for trigger discriminator type: " + triggerType);

					TriggerPropertyBundle triggerProps = tDel.loadExtendedTriggerProperties(conn, triggerKey);

					TriggerBuilder tb = newTrigger().withDescription(description).withPriority(priority).startAt(startTimeD).endAt(endTimeD).withIdentity(triggerKey).modifiedByCalendar(calendarName).withSchedule(triggerProps.getScheduleBuilder())
							.forJob(jobKey(jobName, jobGroup));

					if (null != map) {
						tb.usingJobData(new WorkDataMap(map));
					}

					trigger = tb.build();

					trigger.setMisfireInstruction(misFireInstr);
					trigger.setNextFireTime(nft);
					trigger.setPreviousFireTime(pft);

					setTriggerStateProperties(trigger, triggerProps);
				}
			}

			return trigger;
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Select the trigger that will be fired at the given fire time.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @param fireTime
	 *          the time that the trigger will be fired
	 * @return a <code>{@link org.quartz.utils.Key}</code> representing the
	 *         trigger that will be fired at the given fire time, or null if no
	 *         trigger will be fired at that time
	 */
	@Override
	public Key selectTriggerForFireTime(Connection conn, long fireTime) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;
		try {
			ps = conn.prepareStatement(rtp(SELECT_TRIGGER_FOR_FIRE_TIME));
			ps.setString(1, STATE_WAITING);
			ps.setBigDecimal(2, new BigDecimal(String.valueOf(fireTime)));
			rs = ps.executeQuery();

			if (rs.next()) {
				return new Key(rs.getString(COL_TRIGGER_NAME), rs.getString(COL_TRIGGER_GROUP));
			} else {
				return null;
			}
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Select all of the trigger group names that are stored.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @return an array of <code>String</code> group names
	 */
	@Override
	public List<String> selectTriggerGroups(Connection conn) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;

		try {
			ps = conn.prepareStatement(rtp(SELECT_TRIGGER_GROUPS));
			rs = ps.executeQuery();

			LinkedList<String> list = new LinkedList<String>();
			while (rs.next()) {
				list.add(rs.getString(1));
			}

			return list;
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	@Override
	public List<String> selectTriggerGroups(Connection conn, GroupMatcher<TriggerKey> matcher) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;

		try {
			ps = conn.prepareStatement(rtp(SELECT_TRIGGER_GROUPS_FILTERED));
			ps.setString(1, toSqlLikeClause(matcher));
			rs = ps.executeQuery();

			LinkedList<String> list = new LinkedList<String>();
			while (rs.next()) {
				list.add(rs.getString(1));
			}

			return list;
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Select a trigger's JobDataMap.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @param triggerName
	 *          the name of the trigger
	 * @param groupName
	 *          the group containing the trigger
	 * @return the <code>{@link org.WorkDataMap.JobDataMap}</code> of the Trigger,
	 * never null, but possibly empty.
	 */
	@Override
	public WorkDataMap selectTriggerJobDataMap(Connection conn, String triggerName, String groupName) throws SQLException, ClassNotFoundException, IOException {

		PreparedStatement ps = null;
		ResultSet rs = null;

		try {
			ps = conn.prepareStatement(rtp(SELECT_TRIGGER_DATA));
			ps.setString(1, triggerName);
			ps.setString(2, groupName);
			rs = ps.executeQuery();

			if (rs.next()) {

				Map<?, ?> map = null;
				if (canUseProperties()) {
					map = getMapFromProperties(rs);
				} else {
					map = (Map<?, ?>) getObjectFromBlob(rs, COL_JOB_DATAMAP);
				}

				rs.close();
				ps.close();

				if (null != map) {
					return new WorkDataMap(map);
				}
			}
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}

		return new WorkDataMap();
	}

	/**
	 * <p>
	 * Get the names of all of the triggers associated with the given job.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @return an array of <code>{@link
	 * org.quartz.utils.Key}</code> objects
	 */
	@Override
	public List<TriggerKey> selectTriggerKeysForJob(Connection conn, WorkKey jobKey) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;

		try {
			ps = conn.prepareStatement(rtp(SELECT_TRIGGERS_FOR_JOB));
			ps.setString(1, jobKey.getName());
			ps.setString(2, jobKey.getGroup());
			rs = ps.executeQuery();

			LinkedList<TriggerKey> list = new LinkedList<TriggerKey>();
			while (rs.next()) {
				String trigName = rs.getString(COL_TRIGGER_NAME);
				String trigGroup = rs.getString(COL_TRIGGER_GROUP);
				list.add(triggerKey(trigName, trigGroup));
			}
			return list;
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	@Override
	public List<Trigger> selectTriggersForCalendar(Connection conn, String calName) throws SQLException, ClassNotFoundException, IOException, JobPersistenceException {

		LinkedList<Trigger> trigList = new LinkedList<Trigger>();
		PreparedStatement ps = null;
		ResultSet rs = null;

		try {
			ps = conn.prepareStatement(rtp(SELECT_TRIGGERS_FOR_CALENDAR));
			ps.setString(1, calName);
			rs = ps.executeQuery();

			while (rs.next()) {
				trigList.add(selectTrigger(conn, triggerKey(rs.getString(COL_TRIGGER_NAME), rs.getString(COL_TRIGGER_GROUP))));
			}
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}

		return trigList;
	}

	/**
	 * <p>
	 * Select the triggers for a job
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @return an array of <code>(@link org.quartz.Trigger)</code> objects
	 *         associated with a given job.
	 * @throws SQLException
	 * @throws JobPersistenceException 
	 */
	@Override
	public List<Trigger> selectTriggersForJob(Connection conn, WorkKey jobKey) throws SQLException, ClassNotFoundException, IOException, JobPersistenceException {

		LinkedList<Trigger> trigList = new LinkedList<Trigger>();
		PreparedStatement ps = null;
		ResultSet rs = null;

		try {
			ps = conn.prepareStatement(rtp(SELECT_TRIGGERS_FOR_JOB));
			ps.setString(1, jobKey.getName());
			ps.setString(2, jobKey.getGroup());
			rs = ps.executeQuery();

			while (rs.next()) {
				Trigger t = selectTrigger(conn, triggerKey(rs.getString(COL_TRIGGER_NAME), rs.getString(COL_TRIGGER_GROUP)));
				if (t != null) {
					trigList.add(t);
				}
			}
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}

		return trigList;
	}

	/**
	 * <p>
	 * Select all of the triggers for jobs that are requesting recovery. The
	 * returned trigger objects will have unique "recoverXXX" trigger names and
	 * will be in the <code>{@link
	 * org.go.quartz.Scheduler}.DEFAULT_RECOVERY_GROUP</code>
	 * trigger group.
	 * </p>
	 * 
	 * <p>
	 * In order to preserve the ordering of the triggers, the fire time will be
	 * set from the <code>COL_FIRED_TIME</code> column in the <code>TABLE_FIRED_TRIGGERS</code>
	 * table. The caller is responsible for calling <code>computeFirstFireTime</code>
	 * on each returned trigger. It is also up to the caller to insert the
	 * returned triggers to ensure that they are fired.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @return an array of <code>{@link org.quartz.Trigger}</code> objects
	 */
	@Override
	public List<Trigger> selectTriggersForRecoveringJobs(Connection conn) throws SQLException, IOException, ClassNotFoundException {
		PreparedStatement ps = null;
		ResultSet rs = null;

		try {
			ps = conn.prepareStatement(rtp(SELECT_INSTANCES_RECOVERABLE_FIRED_TRIGGERS));
			ps.setString(1, instanceId);
			setBoolean(ps, 2, true);
			rs = ps.executeQuery();

			long dumId = System.currentTimeMillis();
			LinkedList<Trigger> list = new LinkedList<Trigger>();
			while (rs.next()) {
				String jobName = rs.getString(COL_JOB_NAME);
				String jobGroup = rs.getString(COL_JOB_GROUP);
				String trigName = rs.getString(COL_TRIGGER_NAME);
				String trigGroup = rs.getString(COL_TRIGGER_GROUP);
				long firedTime = rs.getLong(COL_FIRED_TIME);
				int priority = rs.getInt(COL_PRIORITY);
				Trigger rcvryTrig = null;
				//new RepeatTrigger("recover_" + instanceId + "_" + String.valueOf(dumId++), GoDomain.DEFAULT_RECOVERY_GROUP, new Date(firedTime));
				//rcvryTrig.setJobName(jobName);
				//rcvryTrig.setJobGroup(jobGroup);
				rcvryTrig.setPriority(priority);
				rcvryTrig.setMisfireInstruction(GoDomain.MISFIRE_INSTRUCTION_FIRE_NOW);

				WorkDataMap jd = selectTriggerJobDataMap(conn, trigName, trigGroup);
				jd.put(GoDomain.FAILED_JOB_ORIGINAL_TRIGGER_NAME, trigName);
				jd.put(GoDomain.FAILED_JOB_ORIGINAL_TRIGGER_GROUP, trigGroup);
				jd.put(GoDomain.FAILED_JOB_ORIGINAL_TRIGGER_FIRETIME_IN_MILLISECONDS, String.valueOf(firedTime));
				//rcvryTrig.setJobDataMap(jd);

				list.add(rcvryTrig);
			}
			return list;
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Select all of the triggers contained in a given group.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @param matcher
	 *          to evaluate against known triggers
	 * @return a Set of <code>TriggerKey</code>s
	 */
	@Override
	public Set<TriggerKey> selectTriggersInGroup(Connection conn, GroupMatcher<TriggerKey> matcher) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;

		try {
			ps = conn.prepareStatement(rtp(SELECT_TRIGGERS_IN_GROUP));
			ps.setString(1, toSqlLikeClause(matcher));
			rs = ps.executeQuery();

			Set<TriggerKey> keys = new HashSet<TriggerKey>();
			while (rs.next()) {
				keys.add(triggerKey(rs.getString(1), rs.getString(2)));
			}

			return keys;
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Select all of the triggers in a given state.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @param state
	 *          the state the triggers must be in
	 * @return an array of trigger <code>Key</code> s
	 */
	@Override
	public List<TriggerKey> selectTriggersInState(Connection conn, String state) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;

		try {
			ps = conn.prepareStatement(rtp(SELECT_TRIGGERS_IN_STATE));
			ps.setString(1, state);
			rs = ps.executeQuery();

			LinkedList<TriggerKey> list = new LinkedList<TriggerKey>();
			while (rs.next()) {
				list.add(triggerKey(rs.getString(1), rs.getString(2)));
			}

			return list;
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	@Override
	public String selectTriggerState(Connection conn, Key<TriggerKey> key) {
		return null;
	}

	//---------------------------------------------------------------------------
	// protected methods that can be overridden by subclasses
	//---------------------------------------------------------------------------

	/**
	 * <p>
	 * Select a trigger' status (state & next fire time).
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @return a <code>TriggerStatus</code> object, or null
	 */

	/**
	 * <p>
	 * Select a trigger' state value.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @return the <code>{@link org.quartz.Trigger}</code> object
	 */
	@Override
	public String selectTriggerState(Connection conn, TriggerKey triggerKey) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;

		try {
			String state = null;

			ps = conn.prepareStatement(rtp(SELECT_TRIGGER_STATE));
			ps.setString(1, triggerKey.getName());
			ps.setString(2, triggerKey.getGroup());
			rs = ps.executeQuery();

			if (rs.next()) {
				state = rs.getString(COL_TRIGGER_STATE);
			} else {
				state = STATE_DELETED;
			}

			return state.intern();
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}

	}

	@Override
	public TriggerStatus selectTriggerStatus(Connection conn, Key<TriggerKey> key) {
		return null;
	}

	@Override
	public TriggerStatus selectTriggerStatus(Connection conn, TriggerKey triggerKey) throws SQLException {
		return null;
	}

	/**
	 * <p>
	 * Select the next trigger which will fire to fire between the two given timestamps 
	 * in ascending order of fire time, and then descending by priority.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @param noLaterThan
	 *          highest value of <code>getNextFireTime()</code> of the triggers (exclusive)
	 * @param noEarlierThan 
	 *          highest value of <code>getNextFireTime()</code> of the triggers (inclusive)
	 *          
	 * @return A (never null, possibly empty) list of the identifiers (Key objects) of the next triggers to be fired.
	 */
	@Override
	public List<TriggerKey> selectTriggerToAcquire(Connection conn, long noLaterThan, long noEarlierThan) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;
		List<TriggerKey> nextTriggers = new LinkedList<TriggerKey>();
		try {
			ps = conn.prepareStatement(rtp(SELECT_NEXT_TRIGGER_TO_ACQUIRE));

			// Try to give jdbc driver a hint to hopefully not pull over 
			// more than the few rows we actually need.
			ps.setFetchSize(5);
			ps.setMaxRows(5);

			ps.setString(1, STATE_WAITING);
			ps.setBigDecimal(2, new BigDecimal(String.valueOf(noLaterThan)));
			ps.setBigDecimal(3, new BigDecimal(String.valueOf(noEarlierThan)));
			rs = ps.executeQuery();

			while (rs.next() && nextTriggers.size() < 5) {
				nextTriggers.add(triggerKey(rs.getString(COL_TRIGGER_NAME), rs.getString(COL_TRIGGER_GROUP)));
			}

			return nextTriggers;
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Check whether or not a trigger exists.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @return true if the trigger exists, false otherwise
	 */
	@Override
	public boolean triggerExists(Connection conn, TriggerKey triggerKey) throws SQLException {
		PreparedStatement ps = null;
		ResultSet rs = null;

		try {
			ps = conn.prepareStatement(rtp(SELECT_TRIGGER_EXISTENCE));
			ps.setString(1, triggerKey.getName());
			ps.setString(2, triggerKey.getGroup());
			rs = ps.executeQuery();

			if (rs.next()) {
				return true;
			} else {
				return false;
			}
		} finally {
			closeResultSet(rs);
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Update the blob trigger data.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @param trigger
	 *          the trigger to insert
	 * @return the number of rows updated
	 */
	public int updateBlobTrigger(Connection conn, Trigger trigger) throws SQLException, IOException {
		PreparedStatement ps = null;
		ByteArrayOutputStream os = null;

		try {
			// update the blob
			os = new ByteArrayOutputStream();
			ObjectOutputStream oos = new ObjectOutputStream(os);
			oos.writeObject(trigger);
			oos.close();

			byte[] buf = os.toByteArray();
			ByteArrayInputStream is = new ByteArrayInputStream(buf);

			ps = conn.prepareStatement(rtp(UPDATE_BLOB_TRIGGER));
			ps.setBinaryStream(1, is, buf.length);
			ps.setString(2, trigger.getKey().getName());
			ps.setString(3, trigger.getKey().getGroup());

			return ps.executeUpdate();
		} finally {
			closeStatement(ps);
			if (os != null) {
				os.close();
			}
		}
	}

	/**
	 * <p>
	 * Update a calendar.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @param calendarName
	 *          the name for the new calendar
	 * @param calendar
	 *          the calendar
	 * @return the number of rows updated
	 * @throws IOException
	 *           if there were problems serializing the calendar
	 */
	@Override
	public int updateCalendar(Connection conn, String calendarName, Calendar calendar) throws IOException, SQLException {
		ByteArrayOutputStream baos = serializeObject(calendar);

		PreparedStatement ps = null;

		try {
			ps = conn.prepareStatement(rtp(UPDATE_CALENDAR));
			setBytes(ps, 1, baos);
			ps.setString(2, calendarName);

			return ps.executeUpdate();
		} finally {
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Update a fired trigger.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @param trigger
	 *          the trigger
	 * @param state
	 *          the state that the trigger should be stored in
	 * @return the number of rows inserted
	 */
	@Override
	public int updateFiredTrigger(Connection conn, Trigger trigger, String state, Work job) throws SQLException {
		PreparedStatement ps = null;
		try {
			ps = conn.prepareStatement(rtp(UPDATE_FIRED_TRIGGER));

			ps.setString(1, instanceId);

			ps.setBigDecimal(2, new BigDecimal(String.valueOf(trigger.getNextFireTime().getTime())));
			ps.setString(3, state);

			if (job != null) {
				ps.setString(4, trigger.getJobKey().getName());
				ps.setString(5, trigger.getJobKey().getGroup());
				setBoolean(ps, 6, job.isConcurrentExectionDisallowed());
				//setBoolean(ps, 7, job.requestsRecovery());
			} else {
				ps.setString(4, null);
				ps.setString(5, null);
				setBoolean(ps, 6, false);
				setBoolean(ps, 7, false);
			}

			ps.setString(8, trigger.getFireInstanceId());

			return ps.executeUpdate();
		} finally {
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Update the job data map for the given job.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @param job
	 *          the job to update
	 * @return the number of rows updated
	 */
	@Override
	public int updateJobData(Connection conn, Work job) throws IOException, SQLException {
		ByteArrayOutputStream baos = null;// serializeJobData(job.getJobDataMap());

		PreparedStatement ps = null;

		try {
			ps = conn.prepareStatement(rtp(UPDATE_JOB_DATA));
			setBytes(ps, 1, baos);
			ps.setString(2, job.getKey().getName());
			ps.setString(3, job.getKey().getGroup());

			return ps.executeUpdate();
		} finally {
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Update the job detail record.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @param job
	 *          the job to update
	 * @return number of rows updated
	 * @throws IOException
	 *           if there were problems serializing the JobDataMap
	 */
	@Override
	public int updateJobDetail(Connection conn, Work job) throws IOException, SQLException {
		ByteArrayOutputStream baos = null;
		//serializeJobData(job.getJobDataMap());

		PreparedStatement ps = null;

		int insertResult = 0;

		try {
			ps = conn.prepareStatement(rtp(UPDATE_JOB_DETAIL));
			ps.setString(1, job.getDescription());
			ps.setString(2, job.getWorkClass().getName());
			//setBoolean(ps, 3, job.isDurable());
			setBoolean(ps, 4, job.isConcurrentExectionDisallowed());
			//setBoolean(ps, 5, job.isPersistJobDataAfterExecution());
			//setBoolean(ps, 6, job.requestsRecovery());
			setBytes(ps, 7, baos);
			ps.setString(8, job.getKey().getName());
			ps.setString(9, job.getKey().getGroup());

			insertResult = ps.executeUpdate();
		} finally {
			closeStatement(ps);
		}

		return insertResult;
	}

	@Override
	public int updateSchedulerState(Connection conn, String instanceId, long checkInTime) throws SQLException {
		PreparedStatement ps = null;
		try {
			ps = conn.prepareStatement(rtp(UPDATE_SCHEDULER_STATE));
			ps.setLong(1, checkInTime);
			ps.setString(2, instanceId);

			return ps.executeUpdate();
		} finally {
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Update the base trigger data.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @param trigger
	 *          the trigger to insert
	 * @param state
	 *          the state that the trigger should be stored in
	 * @return the number of rows updated
	 */
	@Override
	public int updateTrigger(Connection conn, Trigger trigger, String state, Work jobDetail) throws SQLException, IOException {

		// save some clock cycles by unnecessarily writing job data blob ...
		boolean updateJobData = trigger.getJobDataMap().isDirty();
		ByteArrayOutputStream baos = null;
		if (updateJobData && trigger.getJobDataMap().size() > 0) {
			baos = serializeJobData(trigger.getJobDataMap());
		}

		PreparedStatement ps = null;

		int insertResult = 0;

		try {
			if (updateJobData) {
				ps = conn.prepareStatement(rtp(UPDATE_TRIGGER));
			} else {
				ps = conn.prepareStatement(rtp(UPDATE_TRIGGER_SKIP_DATA));
			}

			ps.setString(1, trigger.getJobKey().getName());
			ps.setString(2, trigger.getJobKey().getGroup());
			ps.setString(3, trigger.getDescription());
			long nextFireTime = -1;
			if (trigger.getNextFireTime() != null) {
				nextFireTime = trigger.getNextFireTime().getTime();
			}
			ps.setBigDecimal(4, new BigDecimal(String.valueOf(nextFireTime)));
			long prevFireTime = -1;
			if (trigger.getPreviousFireTime() != null) {
				prevFireTime = trigger.getPreviousFireTime().getTime();
			}
			ps.setBigDecimal(5, new BigDecimal(String.valueOf(prevFireTime)));
			ps.setString(6, state);

			TriggerPersistenceDelegate tDel = findTriggerPersistenceDelegate(trigger);

			String type = TTYPE_BLOB;
			if (tDel != null)
				type = tDel.getHandledTriggerTypeDiscriminator();

			ps.setString(7, type);

			ps.setBigDecimal(8, new BigDecimal(String.valueOf(trigger.getStartTime().getTime())));
			long endTime = 0;
			if (trigger.getEndTime() != null) {
				endTime = trigger.getEndTime().getTime();
			}
			ps.setBigDecimal(9, new BigDecimal(String.valueOf(endTime)));
			ps.setString(10, trigger.getCalendarName());
			ps.setInt(11, trigger.getMisfireInstruction());
			ps.setInt(12, trigger.getPriority());

			if (updateJobData) {
				setBytes(ps, 13, baos);
				ps.setString(14, trigger.getKey().getName());
				ps.setString(15, trigger.getKey().getGroup());
			} else {
				ps.setString(13, trigger.getKey().getName());
				ps.setString(14, trigger.getKey().getGroup());
			}

			insertResult = ps.executeUpdate();

			if (tDel == null)
				updateBlobTrigger(conn, trigger);
			else
				tDel.updateExtendedTriggerProperties(conn, trigger, state, jobDetail);

		} finally {
			closeStatement(ps);
		}

		return insertResult;
	}

	/**
	 * <p>
	 * Update all of the triggers of the given group to the given new state, if
	 * they are in the given old state.
	 * </p>
	 * 
	 * @param conn
	 *          the DB connection
	 * @param matcher
	 *          the groupMatcher to evaluate the triggers against
	 * @param newState
	 *          the new state for the trigger group
	 * @param oldState
	 *          the old state the triggers must be in
	 * @return int the number of rows updated
	 * @throws SQLException
	 */
	@Override
	public int updateTriggerGroupStateFromOtherState(Connection conn, GroupMatcher<TriggerKey> matcher, String newState, String oldState) throws SQLException {
		PreparedStatement ps = null;

		try {
			ps = conn.prepareStatement(rtp(UPDATE_TRIGGER_GROUP_STATE_FROM_STATE));
			ps.setString(1, newState);
			ps.setString(2, toSqlLikeClause(matcher));
			ps.setString(3, oldState);

			return ps.executeUpdate();
		} finally {
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Update all triggers in the given group to the given new state, if they
	 * are in one of the given old states.
	 * </p>
	 * 
	 * @param conn
	 *          the DB connection
	 * @param matcher
	 *          the groupMatcher to evaluate the triggers against
	 * @param newState
	 *          the new state for the trigger
	 * @param oldState1
	 *          one of the old state the trigger must be in
	 * @param oldState2
	 *          one of the old state the trigger must be in
	 * @param oldState3
	 *          one of the old state the trigger must be in
	 * @return int the number of rows updated
	 * @throws SQLException
	 */
	@Override
	public int updateTriggerGroupStateFromOtherStates(Connection conn, GroupMatcher<TriggerKey> matcher, String newState, String oldState1, String oldState2, String oldState3) throws SQLException {
		PreparedStatement ps = null;

		try {
			ps = conn.prepareStatement(rtp(UPDATE_TRIGGER_GROUP_STATE_FROM_STATES));
			ps.setString(1, newState);
			ps.setString(2, toSqlLikeClause(matcher));
			ps.setString(3, oldState1);
			ps.setString(4, oldState2);
			ps.setString(5, oldState3);

			return ps.executeUpdate();
		} finally {
			closeStatement(ps);
		}
	}

	@Override
	public void updateTriggerState(Connection conn, Key<TriggerKey> key, String stateError) {

	}

	/**
	 * <p>
	 * Update the state for a given trigger.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @param state
	 *          the new state for the trigger
	 * @return the number of rows updated
	 */
	@Override
	public int updateTriggerState(Connection conn, TriggerKey triggerKey, String state) throws SQLException {
		PreparedStatement ps = null;

		try {
			ps = conn.prepareStatement(rtp(UPDATE_TRIGGER_STATE));
			ps.setString(1, state);
			ps.setString(2, triggerKey.getName());
			ps.setString(3, triggerKey.getGroup());
			return ps.executeUpdate();
		} finally {
			closeStatement(ps);
		}
	}

	@Override
	public void updateTriggerStateFromOtherState(Connection conn, Key<TriggerKey> key, String stateWaiting, String stateAcquired) {

	}

	/**
	 * <p>
	 * Update the given trigger to the given new state, if it is in the given
	 * old state.
	 * </p>
	 * 
	 * @param conn
	 *          the DB connection
	 * @param newState
	 *          the new state for the trigger
	 * @param oldState
	 *          the old state the trigger must be in
	 * @return int the number of rows updated
	 * @throws SQLException
	 */
	@Override
	public int updateTriggerStateFromOtherState(Connection conn, TriggerKey triggerKey, String newState, String oldState) throws SQLException {
		PreparedStatement ps = null;

		try {
			ps = conn.prepareStatement(rtp(UPDATE_TRIGGER_STATE_FROM_STATE));
			ps.setString(1, newState);
			ps.setString(2, triggerKey.getName());
			ps.setString(3, triggerKey.getGroup());
			ps.setString(4, oldState);

			return ps.executeUpdate();
		} finally {
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Update the given trigger to the given new state, if it is one of the
	 * given old states.
	 * </p>
	 * 
	 * @param conn
	 *          the DB connection
	 * @param newState
	 *          the new state for the trigger
	 * @param oldState1
	 *          one of the old state the trigger must be in
	 * @param oldState2
	 *          one of the old state the trigger must be in
	 * @param oldState3
	 *          one of the old state the trigger must be in
	 * @return int the number of rows updated
	 * @throws SQLException
	 */
	@Override
	public int updateTriggerStateFromOtherStates(Connection conn, TriggerKey triggerKey, String newState, String oldState1, String oldState2, String oldState3) throws SQLException {
		PreparedStatement ps = null;

		try {
			ps = conn.prepareStatement(rtp(UPDATE_TRIGGER_STATE_FROM_STATES));
			ps.setString(1, newState);
			ps.setString(2, triggerKey.getName());
			ps.setString(3, triggerKey.getGroup());
			ps.setString(4, oldState1);
			ps.setString(5, oldState2);
			ps.setString(6, oldState3);

			return ps.executeUpdate();
		} finally {
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Update the states of all triggers associated with the given job.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @param state
	 *          the new state for the triggers
	 * @return the number of rows updated
	 */
	@Override
	public int updateTriggerStatesForJob(Connection conn, WorkKey jobKey, String state) throws SQLException {
		PreparedStatement ps = null;

		try {
			ps = conn.prepareStatement(rtp(UPDATE_JOB_TRIGGER_STATES));
			ps.setString(1, state);
			ps.setString(2, jobKey.getName());
			ps.setString(3, jobKey.getGroup());

			return ps.executeUpdate();
		} finally {
			closeStatement(ps);
		}
	}

	@Override
	public int updateTriggerStatesForJobFromOtherState(Connection conn, WorkKey jobKey, String state, String oldState) throws SQLException {
		PreparedStatement ps = null;

		try {
			ps = conn.prepareStatement(rtp(UPDATE_JOB_TRIGGER_STATES_FROM_OTHER_STATE));
			ps.setString(1, state);
			ps.setString(2, jobKey.getName());
			ps.setString(3, jobKey.getGroup());
			ps.setString(4, oldState);

			return ps.executeUpdate();
		} finally {
			closeStatement(ps);
		}
	}

	/**
	 * <p>
	 * Insert the job detail record.
	 * </p>
	 * 
	 * @param conn
	 *          the DB Connection
	 * @param newState
	 *          the new state for the triggers
	 * @param oldState1
	 *          the first old state to update
	 * @param oldState2
	 *          the second old state to update
	 * @return number of rows updated
	 */
	@Override
	public int updateTriggerStatesFromOtherStates(Connection conn, String newState, String oldState1, String oldState2) throws SQLException {
		PreparedStatement ps = null;

		try {
			ps = conn.prepareStatement(rtp(UPDATE_TRIGGER_STATES_FROM_OTHER_STATES));
			ps.setString(1, newState);
			ps.setString(2, oldState1);
			ps.setString(3, oldState2);
			return ps.executeUpdate();
		} finally {
			closeStatement(ps);
		}
	}

}
